[Amazon SageMaker] Amazon SageMaker Ground Truth で作成したデータをオブジェクト検出で利用可能なイメージ形式に変換してみました
1 はじめに
CX事業本部の平内(SIN)です。
Amazon SageMaker の組み込みの物体検出アルゴリズム(object-detection)では、イメージ形式のデータセットが利用可能です。
今回は、Amazon SageMaker Ground Truth(以下、Ground Truth)で作成したデータを変換して、このイメージ形式のデータセットを作成してみました。
2 Ground Truth
最初に、プライベートなプロジェクトを作成して、ごく少数ですが、データセットを作成しました。
作業が完了すると、Output dataset locationに示されるバケットにデータが保存されます。
生成されるデータは、output.manifestというファイルです。
output.manifestは、各画像ごと1行のアノテーション情報となっており、1行を見やすく展開すると以下のようになります。
output.manifest
{ "source-ref": "s3://sagemaker-working-bucket-001/GroundTruth-Input/AHIRU-1586397259091391.jpg", "AHIRU-Project": { "annotations": [ { "class_id": 0, "width": 249, "top": 210, "height": 256, "left": 209 } ], "image_size": [ { "width": 800, "depth": 3, "height": 600 } ] }, "AHIRU-Project-metadata": { "job-name": "labeling-job/ahiru-project", "class-map": { "0": "AHIRU" }, "human-annotated": "yes", "objects": [ { "confidence": 0.09 } ], "creation-date": "2020-04-09T02:45:23.185192", "type": "groundtruth/object-detection" } }
3 イメージ形式のデータ
一方、SageMakerのオブジェクト検出で使用されるイメージ形式のデータは、画像とアノテーションで構成されています。
下記は、アノテーション用のJSONファイルの一例です。fileは、画像ファイル名ですが、S3のバケットとプレフィックスまで、fit()でdata_channelsとして渡しますので、そこからの相対パスになります。また .jsonファイルは対応する画像と同じ名前である必要もあります。
class_idは、0から始まるオブジェクトクラスのインデックスです。
{ "file": "AHIRU-1586397287273306.jpg", "image_size": [ { "width": 800, "height": 600, "depth": 3 } ], "annotations": [ { "class_id": 1, "top": 218, "left": 209, "width": 263, "height": 226 } ], "categories": [ { "class_id": 0, "name": "DOG" }, { "class_id": 1, "name": "AHIRU" } ] }
4 学習データと検証データ
教師あり学習のアルゴリズムである、オブジェクト検出では、学習用のデータと、検証用のデータが必要です。
ここでは、以下のプレフィックスで保存します。
- train (学習用の画像)
- validation (検証用の画像)
- train_annotation (学習用のアノテーション)
- validation_annotation (検証用のアノテーション)
学習用と検証用として、元データを、一定の比率で分割する場合、単純に分割することができません。全てのデータに均等に全てのラベルが含まれているわけでは無いからです。
たとえば、先頭から40件目まで、Aというラベルが指定されており、41〜50件目まで、Bが指定されているとします。そして、これを単純に4:1に分割すると、1〜40と41〜50となるため、学習用データにBのデータが存在しないことになってしまいます。
この問題に対応するため、変換対象となる全てのデータから、含まれるラベルの数をカウントし、サンプル数の少ないラベルから順に、分割するようにしました。
# ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる) def getLabel(dataList): labels = {} for data in dataList: for annotation in data.annotations: label = annotation["label"] if(label in labels): labels[label] += 1 else: labels[label] = 1 # ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる) labels = sorted(labels.items(), key=lambda x:x[1]) return labels labels = getLabel(dataList) # ラベルの数の少ないものから優先して分割する for i,label in enumerate(labels): # ・・・分割処理
5 コード
変換を行う、すべてコードは、以下のとおりです。以下を定義を設定して利用可能です。
- inputPath 元データ(画像データとoutput.manifestを置く)
- outputPath 出力先
- ratio 学習データと検証データの分割比率
import json import glob import os import shutil # 定義 inputPath = '/tmp/GroundTruth' outputPath = '/tmp/SageMakerImageDataSet' manifest = 'output.manifest' # 学習用と検証用の分割比率 ratio = 0.8 # 80%対、20%に分割する # 1件のJデータを表現するクラス class Data(): def __init__(self, src): # プロジェクト名の取得 for key in src.keys(): index = key.rfind("-metadata") if(index!=-1): projectName = key[0:index] # メタデータの取得 metadata = src[projectName + '-metadata'] class_map = metadata["class-map"] # 画像名の取得 self.imgFileName = os.path.basename(src["source-ref"]) self.baseName = self.imgFileName.split('.')[0] # 画像サイズの取得 project = src[projectName] image_size = project["image_size"] self.img_width = image_size[0]["width"] self.img_height = image_size[0]["height"] self.annotations = [] # アノテーションの取得 for annotation in project["annotations"]: class_id = annotation["class_id"] top = annotation["top"] left = annotation["left"] width = annotation["width"] height = annotation["height"] self.annotations.append({ "label": class_map[str(class_id)], "width": width, "top": top, "height": height, "left": left }) # 指定されたラベルを含むかどうか def exsists(self, label): for annotation in self.annotations: if(annotation["label"] == label): return True return False def store(self, dataPath, annotationPath, inputPath, labels): cls_list = [] for label in labels: cls_list.append(label[0]) jsonData = {} jsonData["file"] = self.imgFileName jsonData["image_size"] = [] jsonData["image_size"].append({ "width": self.img_width, "height": self.img_height, "depth": 3 }) jsonData["annotations"] = [] for annotation in self.annotations: cls_id = cls_list.index(annotation["label"]) jsonData["annotations"].append({ "class_id": cls_id, "top": annotation["top"], "left": annotation["left"], "width": annotation["width"], "height": annotation["height"] }) jsonData["categories"] = [] for i, cls_name in enumerate(cls_list): jsonData["categories"].append( { "class_id": i, "name": cls_name } ) # jsonの保存 with open("{}/{}.json".format(annotationPath, self.baseName), mode='w') as f: json.dump(jsonData, f) # 画像のコピー shutil.copyfile("{}/{}".format(inputPath, self.imgFileName),"{}/{}".format(dataPath, self.imgFileName)) # dataListをラベルを含むものと、含まないものに分割する def deviedDataList(dataList, label): targetList = [] unTargetList = [] for data in dataList: if(data.exsists(label)): targetList.append(data) else: unTargetList.append(data) return (targetList, unTargetList) # ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる) def getLabel(dataList): labels = {} for data in dataList: for annotation in data.annotations: label = annotation["label"] if(label in labels): labels[label] += 1 else: labels[label] = 1 # ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる) labels = sorted(labels.items(), key=lambda x:x[1]) return labels # 全てのJSONデータを読み込む def getDataList(inputPath, manifest): dataList = [] with open("{}/{}".format(inputPath, manifest), 'r') as f: srcList = f.read().split('\n') for src in srcList: if(src != ''): json_src = json.loads(src) dataList.append(Data(json.loads(src))) return dataList def main(): # 出力先フォルダ生成 train = "{}/train".format(outputPath) validation = "{}/validation".format(outputPath) train_annotation = "{}/train_annotation".format(outputPath) validation_annotation = "{}/validation_annotation".format(outputPath) os.makedirs(outputPath, exist_ok=True) os.makedirs(train, exist_ok=True) os.makedirs(validation, exist_ok=True) os.makedirs(train_annotation, exist_ok=True) os.makedirs(validation_annotation, exist_ok=True) # 全てのJSONデータを読み込む dataList = getDataList(inputPath, manifest) log = "全データ: {}件 ".format(len(dataList)) # ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる) labels = getLabel(dataList) for i,label in enumerate(labels): log += "[{}]{}: {}件 ".format(i, label[0], label[1]) print(log) # 保存済みリスト storedList = [] log = '' # ラベルの数の少ないものから優先して分割する for i,label in enumerate(labels): log = '' log += "{} => ".format(label[0]) # dataListをラベルが含まれるものと、含まないものに分割する (targetList, unTargetList) = deviedDataList(dataList, label[0]) # 保存済みリストから、当該ラベルで既に保存済の件数をカウントする (include, notInclude) = deviedDataList(storedList, label[0]) storedCounst = len(include) # train用に必要な件数 count = int(label[1] * ratio) - storedCounst log += "{}:".format(count) # train側への保存 for i in range(count): data = targetList.pop() data.store(train, train_annotation, inputPath, labels) storedList.append(data) # validation側への保存 log += "{} ".format(len(targetList)) for data in targetList: data.store(validation, validation_annotation, inputPath, labels) storedList.append(data) dataList = unTargetList log += "残り:{}件".format(len(dataList)) print(log) main()
例えば、画像ファイルが50件で、ラベルDOGが10件、ラベルAHIRUが50件アノテーションされているデータをを変換すると、以下のようなログが出力されます。
全データ: 50件 [0]DOG: 10件 [1]AHIRU: 50件 DOG => 8:2 残り:40件 AHIRU => 30:10 残り:0件
6 オブジェクト検出
非常に限られたサンプル(データ)数ですが、とりあえず、モデルを作成してみました。DOGは、10件という事で、無理ですが、AHIRUの方は、限定的ですが、少し検出出来ている感じがします。
7 最後に
今回は、Ground Truthで作成されたデータをSageMakerで利用するためのデータに変換する作業を行ってみました。
Ground Truthでは、アノテーションの作業を外注できる仕組みとなっています。このデータを中心に、モデル作成(更新)の一連の流れを上手く作っていけたらなと妄想しています。